package com.atguigu.flink.datastreamapi.agg;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/11
 */
public class Demo4_Process
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                   //使用process进行先filter,再map,再flatMap操作。
                    //只留下s1传感器的数据，把vc + 10，复制2遍输出
                    .process(new ProcessFunction<WaterSensor, WaterSensor>()
                    {
                        /*
                            WaterSensor value: 当前到达的数据
                            Context ctx： 环境。上下文。从这个对象中获取非常丰富的信息。
                            Collector<WaterSensor> out： 输出

                         */
                        @Override
                        public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                            if ("s1".equals(value.getId())){
                                value.setVc(value.getVc() + 10);
                                out.collect(value);
                                out.collect(value);
                            }
                        }
                    })
                    .print();

                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
    }
}
